{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Lab 2 - Social Network Analysis with GraphX\n",
    "The following steps demonstrate how to use GraphX to make a base graph and apply functions to it."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Throughout this assignment we will use the following property graph that makes a small social network with users and their ages modeled as vertices and likes modeled as directed edges.\n",
    "\n",
    "<img src=\"figs/social_graph.png\" width=\"500\">\n",
    "\n",
    "We begin by creating the property graph from arrays of vertices and edges. Later we will demonstrate how to load real data. Here we use the `Edge` class. Edges have a `srcId` and a `dstId` corresponding to the source and destination vertex identifiers. In addition, the `Edge` class has an `attr` member that stores the edge property (in this case the number of likes). Use `sc.parallelize` to construct the following RDDs from the `vertexArray` and `edgeArray` variables, and then build a property graph. The basic property graph constructor takes an RDD of vertices (with type `RDD[(VertexId, V)]`) and an RDD of edges (with type `RDD[Edge[E]]`) and builds a graph (with type `Graph[V, E]`). "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Intitializing Scala interpreter ..."
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "Spark Web UI available at http://a05cbddd16bc:4040\n",
       "SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1570375791782)\n",
       "SparkSession available as 'spark'\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "import org.apache.spark.graphx._\n",
       "import org.apache.spark.rdd.RDD\n",
       "vertexArray: Array[(Long, (String, Int))] = Array((1,(Alice,28)), (2,(Bob,27)), (3,(Charlie,65)), (4,(David,42)), (5,(Ed,55)), (6,(Fran,50)))\n",
       "edgeArray: Array[org.apache.spark.graphx.Edge[Int]] = Array(Edge(2,1,7), Edge(2,4,2), Edge(3,2,4), Edge(3,6,3), Edge(4,1,1), Edge(5,2,2), Edge(5,3,8), Edge(5,6,3))\n",
       "vertexRDD: org.apache.spark.rdd.RDD[(Long, (String, Int))] = ParallelCollectionRDD[0] at parallelize at <console>:47\n",
       "edgeRDD: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Int]] = ParallelCollectionRDD[1] at parallelize at <console>:48\n",
       "graph: org.apache.spark.graphx.Graph[(String, Int),Int] = org.apache.spark.graphx.impl.GraphImpl@5728248b\n"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import org.apache.spark.graphx._\n",
    "import org.apache.spark.rdd.RDD\n",
    " \n",
    "val vertexArray = Array(\n",
    "  (1L, (\"Alice\", 28)),\n",
    "  (2L, (\"Bob\", 27)),\n",
    "  (3L, (\"Charlie\", 65)),\n",
    "  (4L, (\"David\", 42)),\n",
    "  (5L, (\"Ed\", 55)),\n",
    "  (6L, (\"Fran\", 50))\n",
    "  )\n",
    "val edgeArray = Array(\n",
    "  Edge(2L, 1L, 7),\n",
    "  Edge(2L, 4L, 2),\n",
    "  Edge(3L, 2L, 4),\n",
    "  Edge(3L, 6L, 3),\n",
    "  Edge(4L, 1L, 1),\n",
    "  Edge(5L, 2L, 2),\n",
    "  Edge(5L, 3L, 8),\n",
    "  Edge(5L, 6L, 3)\n",
    "  )\n",
    "\n",
    "val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)\n",
    "val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)\n",
    "val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In many cases we will want to extract the vertex and edge RDD views of a graph. As a consequence, the graph class contains members (`graph.vertices` and `graph.edges`) to access the vertices and edges of the graph. While these members extend `RDD[(VertexId, V)`] and `RDD[Edge[E]]` they are actually backed by optimized representations that leverage the internal GraphX representation of graph data. Below, use `graph.vertices` to display the names of the users that are at least 30 years old. The output should contain:\n",
    "```\n",
    "David is 42\n",
    "Fran is 50\n",
    "Ed is 55\n",
    "Charlie is 65\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "David is 42\n",
      "Fran is 50\n",
      "Charlie is 65\n",
      "Ed is 55\n"
     ]
    }
   ],
   "source": [
    "graph.vertices.filter{ case (id, (name, age)) => age >= 30 }.collect.foreach(v => println(s\"${v._2._1} is ${v._2._2}\"))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, use the `graph.triplets` view to display who likes who. The output should look like:\n",
    "```\n",
    "Bob likes Alice\n",
    "Bob likes David\n",
    "Charlie likes Bob\n",
    "Charlie likes Fran\n",
    "David likes Alice\n",
    "Ed likes Bob\n",
    "Ed likes Charlie\n",
    "Ed likes Fran\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "David likes Alice\n",
      "Ed likes Bob\n",
      "Ed likes Charlie\n",
      "Bob likes Alice\n",
      "Bob likes David\n",
      "Charlie likes Bob\n",
      "Charlie likes Fran\n",
      "Ed likes Fran\n"
     ]
    }
   ],
   "source": [
    "for (triplet <- graph.triplets) {\n",
    "  println(s\"${triplet.srcAttr._1} likes ${triplet.dstAttr._1}\")\n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If someone likes someone else more than 5 times than that relationship is getting pretty serious. For extra credit, find the lovers."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Bob loves Alice\n",
      "Ed loves Charlie\n"
     ]
    }
   ],
   "source": [
    "for (triplet <- graph.triplets.filter(t => t.attr > 5)) {\n",
    "    println(s\"${triplet.srcAttr._1} loves ${triplet.dstAttr._1}\")\n",
    "}\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can compute the in-degree of the graph using the `graph.inDegrees` operators that returns a `VertexRDD[Int]`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "inDegrees: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[25] at RDD at VertexRDD.scala:57\n"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val inDegrees: VertexRDD[Int] = graph.inDegrees"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, let's incorporate the in and out degree of each vertex into the vertex property. To do this, we first define a `User` class to better organize the vertex property and build a new graph with the user property. We initialized each vertex with 0 in and out degree. Then, we join the in and out degree information with each vertex building the new vertex property. Here we use the `outerJoinVertices` method of `Graph` that takes two argument lists: (i) an RDD of vertex values, and (ii) a function from the id, attribute, and Optional matching value in the RDD to a new vertex value. The `outerJoinVertices` has the following type signature:\n",
    "```scala\n",
    "def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)])\n",
    "    (mapFunc: (VertexID, VD, Option[U]) => VD2): Graph[VD2, ED]\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "import org.apache.spark.graphx._\n",
       "import org.apache.spark.rdd.RDD\n",
       "defined class User\n",
       "initialUserGraph: org.apache.spark.graphx.Graph[User,Int] = org.apache.spark.graphx.impl.GraphImpl@742ebc3a\n",
       "userGraph: org.apache.spark.graphx.Graph[User,Int] = org.apache.spark.graphx.impl.GraphImpl@37a9fb8e\n"
      ]
     },
     "execution_count": 16,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import org.apache.spark.graphx._\n",
    "import org.apache.spark.rdd.RDD\n",
    "\n",
    "// Define a class to more clearly model the user property\n",
    "case class User(name: String, age: Int, inDeg: Int, outDeg: Int)\n",
    "\n",
    "// Create a user Graph\n",
    "val initialUserGraph: Graph[User, Int] = graph.mapVertices{ case (id, (name, age)) => User(name, age, 0, 0) }\n",
    "\n",
    "// Fill in the degree information\n",
    "val userGraph = initialUserGraph.outerJoinVertices(initialUserGraph.inDegrees) {\n",
    "  case (id, u, inDegOpt) => User(u.name, u.age, inDegOpt.getOrElse(0), u.outDeg)\n",
    "}.outerJoinVertices(initialUserGraph.outDegrees) {\n",
    "  case (id, u, outDegOpt) => User(u.name, u.age, u.inDeg, outDegOpt.getOrElse(0))\n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Using the `degreeGraph` print the number of people who like each user:\n",
    "```\n",
    "User 1 is called Alice and is liked by 2 people.\n",
    "User 2 is called Bob and is liked by 2 people.\n",
    "User 3 is called Charlie and is liked by 1 people.\n",
    "User 4 is called David and is liked by 1 people.\n",
    "User 5 is called Ed and is liked by 0 people.\n",
    "User 6 is called Fran and is liked by 2 people.\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "User 1 is called Alice and is liked by 2 people.\n",
      "User 2 is called Bob and is liked by 2 people.\n",
      "User 3 is called Charlie and is liked by 1 people.\n",
      "User 4 is called David and is liked by 1 people.\n",
      "User 5 is called Ed and is liked by 0 people.\n",
      "User 6 is called Fran and is liked by 2 people.\n"
     ]
    }
   ],
   "source": [
    "for ((id, property) <- userGraph.vertices.sortBy(_._1)) {\n",
    "    println(s\"User $id is called ${property.name} and is liked by ${property.inDeg} people.\")\n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Print the names of the users who are liked by the same number of people they like."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "David\n",
      "Bob\n"
     ]
    }
   ],
   "source": [
    "// TODO: Replace <FILL IN> with appropriate code\n",
    "userGraph.vertices.filter {\n",
    "  case (id, u) => u.inDeg == u.outDeg\n",
    "}.foreach {\n",
    "  case (id, property) => println(property.name)\n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Suppose we want to find the oldest follower of each user, using the above property graph. The `aggregateMessages` operator allows us to do this. This operator applies a user defined `sendMsg` function to each edge triplet in the graph and then uses the `mergeMsg` function to aggregate those messages at their destination vertex.\n",
    "```scala\n",
    "class Graph[VD, ED] {\n",
    "  def aggregateMessages[Msg: ClassTag](\n",
    "      sendMsg: EdgeContext[VD, ED, Msg] => Unit,\n",
    "      mergeMsg: (Msg, Msg) => Msg,\n",
    "      tripletFields: TripletFields = TripletFields.All)\n",
    "    : VertexRDD[Msg]\n",
    "}\n",
    "```\n",
    "Bellow, you can find the oldest follower for each user by sending a message containing the name and age of each follower and aggregating the messages by taking the message from the older follower."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "oldestFollower: org.apache.spark.graphx.VertexRDD[(String, Int)] = VertexRDDImpl[119] at RDD at VertexRDD.scala:57\n"
      ]
     },
     "execution_count": 19,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "// Find the oldest follower for each user\n",
    "val oldestFollower: VertexRDD[(String, Int)] = userGraph.aggregateMessages[(String, Int)](\n",
    "  // sendMsg\n",
    "  triplet => { triplet.sendToDst(triplet.srcAttr.name, triplet.srcAttr.age) },\n",
    "  // mergeMsg\n",
    "  (a, b) => if (a._2 > b._2) a else b\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Display the oldest follower for each user as bellow. Note that some users may have no messages.\n",
    "```\n",
    "David is the oldest follower of Alice.\n",
    "Charlie is the oldest follower of Bob.\n",
    "Ed is the oldest follower of Charlie.\n",
    "Bob is the oldest follower of David.\n",
    "Ed does not have any followers.\n",
    "Charlie is the oldest follower of Fran.\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "David is the oldest follower of Alice.\n",
      "Charlie is the oldest follower of Bob.\n",
      "Ed is the oldest follower of Charlie.\n",
      "Bob is the oldest follower of David.\n",
      "Ed does not have any followers.\n",
      "Charlie is the oldest follower of Fran.\n"
     ]
    }
   ],
   "source": [
    "userGraph.vertices.leftJoin(oldestFollower) { (id, user, optOldestFollower) =>\n",
    "  optOldestFollower match {\n",
    "    case None => s\"${user.name} does not have any followers.\"\n",
    "    case Some((name, age)) => s\"${name} is the oldest follower of ${user.name}.\"\n",
    "  }\n",
    "}.collect.sortBy(_._1).foreach { case (id, str) => println(str) }"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, find the average follower age of the followers of each user."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "The average age of David's followers is 27.0.\n",
      "The average age of Fran's followers is 60.0.\n",
      "The average age of Bob's followers is 60.0.\n",
      "The average age of Alice's followers is 34.5.\n",
      "The average age of Charlie's followers is 55.0.\n",
      "Ed does not have any followers.\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "averageAge: org.apache.spark.graphx.VertexRDD[Double] = VertexRDDImpl[127] at RDD at VertexRDD.scala:57\n"
      ]
     },
     "execution_count": 21,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val averageAge: VertexRDD[Double] = userGraph.aggregateMessages[(Int, Double)](\n",
    "  // map function returns a tuple of (1, Age)\n",
    "  triplet => { triplet.sendToDst(1, triplet.srcAttr.age.toDouble) },\n",
    "  // reduce function combines (sumOfFollowers, sumOfAge)\n",
    "  (a, b) => ((a._1 + b._1), (a._2 + b._2))\n",
    ").mapValues((id, p) => p._2 / p._1)\n",
    "\n",
    "// Display the results\n",
    "userGraph.vertices.leftJoin(averageAge) { (id, user, optAverageAge) =>\n",
    "  optAverageAge match {\n",
    "    case None => s\"${user.name} does not have any followers.\"\n",
    "    case Some(avgAge) => s\"The average age of ${user.name}\\'s followers is $avgAge.\"\n",
    "  }\n",
    "}.collect.foreach { case (id, str) => println(str) }"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Suppose you want to study the community structure of users that are 30 or older. To support this type of analysis GraphX includes the `subgraph` operator that takes vertex and edge predicates and returns the graph containing only the vertices that satisfy the vertex predicate (evaluate to true) and edges that satisfy the edge predicate and connect vertices that satisfy the vertex predicate. \n",
    "\n",
    "In the following example, restrict your graph to the users that are 30 or older, and  examine the communities in this restricted graph. Connected components are labeled (numbered) by the lowest vertex Id in that component. Notice that by examining the subgraph you have disconnected David from the rest of his community. Moreover his connections to the rest of the graph are through younger users."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "David is in component 4\n",
      "Fran is in component 3\n",
      "Charlie is in component 3\n",
      "Ed is in component 3\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "olderGraph: org.apache.spark.graphx.Graph[User,Int] = org.apache.spark.graphx.impl.GraphImpl@e07a864\n",
       "cc: org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId,Int] = org.apache.spark.graphx.impl.GraphImpl@d99fa04\n"
      ]
     },
     "execution_count": 22,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val olderGraph = userGraph.subgraph(vpred = (id, user) => user.age >= 30)\n",
    "\n",
    "// compute the connected components\n",
    "val cc = olderGraph.connectedComponents\n",
    "\n",
    "// display the component id of each user:\n",
    "olderGraph.vertices.leftJoin(cc.vertices) {\n",
    "  case (id, user, comp) => s\"${user.name} is in component ${comp.get}\"\n",
    "}.collect.foreach{ case (id, str) => println(str) }"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "spylon-kernel",
   "language": "scala",
   "name": "spylon-kernel"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".scala",
   "help_links": [
    {
     "text": "MetaKernel Magics",
     "url": "https://metakernel.readthedocs.io/en/latest/source/README.html"
    }
   ],
   "mimetype": "text/x-scala",
   "name": "scala",
   "pygments_lexer": "scala",
   "version": "0.4.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
